/*** Eclipse Class Decompiler plugin, copyright (c) 2012 Chao Chen (cnfree2000@hotmail.com) ***/
package com.sankuai.xm.kafka.client;

import com.sankuai.xm.kafka.client.exception.ProducerConfigException;
import com.sankuai.xm.kafka.client.factory.DefaultConsumerProcessor;
import com.sankuai.xm.kafka.client.factory.DefaultProducerProcessor;
import com.sankuai.xm.kafka.client.utils.StackTraceUtil;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaClient {
	private static final Logger log = LoggerFactory
			.getLogger(KafkaClient.class);

	public static IProducerProcessor buildProduceFactory(Properties initConfig)
			throws Exception {
		try {
			String topicName = initConfig.get("kafka.topic").toString();
			if (topicName != null) {
				Producer producer = new Producer(new ProducerConfig(initConfig));
				return new DefaultProducerProcessor(producer, topicName);
			}
			log.error("xm-kafka-client, kafka.topic is null.");
		} catch (Exception e) {
			log.error("xm-kafka-client, build producer occur exception : {}.",
					StackTraceUtil.getStackTrace(e));
			throw new ProducerConfigException(e);
		}
		return null;
	}

	public static IConsumerProcessor buildConsumerFactory(Properties initConfig)
			throws Exception {
		String topicName = initConfig.get("kafka.topic").toString();
		try {
			int consumerThreadNum = Integer.parseInt(initConfig.get(
					"consumer.thread.num").toString());
			if ((topicName != null) && (consumerThreadNum > 0)) {
				ConsumerConnector consumer = Consumer
						.createJavaConsumerConnector(new ConsumerConfig(
								initConfig));
				return new DefaultConsumerProcessor(consumer, topicName,
						consumerThreadNum);
			}
			log.error("xm-kafka-client, kafka.topic || kafka.consumer.thread.num is null.");
		} catch (Exception e) {
			log.error(
					"xm-kafka-client, build consumer occur exception, topic={} ",
					topicName, e);
			throw new ProducerConfigException(e);
		}
		return null;
	}
}